{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Celebrity Quote Analysis with The Cognitive Services on Spark"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"https://mmlspark.blob.core.windows.net/graphics/SparkSummit2/cog_services.png\" width=\"800\" style=\"float: center;\"/>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from mmlspark import *\n",
    "from pyspark.ml import PipelineModel\n",
    "from pyspark.sql.functions import col, udf\n",
    "from pyspark.ml.feature import SQLTransformer\n",
    "import os\n",
    "\n",
    "#put your service keys here\n",
    "TEXT_API_KEY          = os.environ[\"TEXT_API_KEY\"]\n",
    "VISION_API_KEY        = os.environ[\"VISION_API_KEY\"]\n",
    "BING_IMAGE_SEARCH_KEY = os.environ[\"BING_IMAGE_SEARCH_KEY\"]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Extracting celebrity quote images using Bing Image Search on Spark\n",
    "\n",
    "Here we define two Transformers to extract celebrity quote images.\n",
    "\n",
    "<img src=\"https://mmlspark.blob.core.windows.net/graphics/Cog%20Service%20NB/step%201.png\" width=\"600\" style=\"float: center;\"/>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "imgsPerBatch = 10 #the number of images Bing will return for each query\n",
    "offsets = [(i*imgsPerBatch,) for i in range(100)] # A list of offsets, used to page into the search results\n",
    "bingParameters = spark.createDataFrame(offsets, [\"offset\"])\n",
    "\n",
    "bingSearch = BingImageSearch()\\\n",
    "  .setSubscriptionKey(BING_IMAGE_SEARCH_KEY)\\\n",
    "  .setOffsetCol(\"offset\")\\\n",
    "  .setQuery(\"celebrity quotes\")\\\n",
    "  .setCount(imgsPerBatch)\\\n",
    "  .setOutputCol(\"images\")\n",
    "\n",
    "#Transformer to that extracts and flattens the richly structured output of Bing Image Search into a simple URL column\n",
    "getUrls = BingImageSearch.getUrlTransformer(\"images\", \"url\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Recognizing Images of Celebrities\n",
    "This block identifies the name of the celebrities for each of the images returned by the Bing Image Search.\n",
    "\n",
    "<img src=\"https://mmlspark.blob.core.windows.net/graphics/Cog%20Service%20NB/step%202.png\" width=\"600\" style=\"float: center;\"/>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "celebs = RecognizeDomainSpecificContent()\\\n",
    "          .setSubscriptionKey(VISION_API_KEY)\\\n",
    "          .setModel(\"celebrities\")\\\n",
    "          .setUrl(\"https://eastus.api.cognitive.microsoft.com/vision/v2.0/\")\\\n",
    "          .setImageUrlCol(\"url\")\\\n",
    "          .setOutputCol(\"celebs\")\n",
    "\n",
    "#Extract the first celebrity we see from the structured response\n",
    "firstCeleb = SQLTransformer(statement=\"SELECT *, celebs.result.celebrities[0].name as firstCeleb FROM __THIS__\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Reading the quote from the image.\n",
    "This stage performs OCR on the images to recognize the quotes.\n",
    "\n",
    "<img src=\"https://mmlspark.blob.core.windows.net/graphics/Cog%20Service%20NB/step%203.png\" width=\"600\" style=\"float: center;\"/>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "recognizeText = RecognizeText()\\\n",
    "  .setSubscriptionKey(VISION_API_KEY)\\\n",
    "  .setUrl(\"https://eastus.api.cognitive.microsoft.com/vision/v2.0/recognizeText\")\\\n",
    "  .setImageUrlCol(\"url\")\\\n",
    "  .setMode(\"Printed\")\\\n",
    "  .setOutputCol(\"ocr\")\\\n",
    "  .setConcurrency(5)\n",
    "\n",
    "def getTextFunction(ocrRow):\n",
    "    if ocrRow is None: return None\n",
    "    return \"\\n\".join([line.text for line in ocrRow.recognitionResult.lines])\n",
    "\n",
    "# this transformer wil extract a simpler string from the structured output of recognize text\n",
    "getText = UDFTransformer().setUDF(udf(getTextFunction)).setInputCol(\"ocr\").setOutputCol(\"text\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Understanding the Sentiment of the Quote\n",
    "\n",
    "<img src=\"https://mmlspark.blob.core.windows.net/graphics/Cog%20Service%20NB/step%204.png\" width=\"600\" style=\"float: center;\"/>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "sentimentTransformer = TextSentiment()\\\n",
    "    .setTextCol(\"text\")\\\n",
    "    .setUrl(\"https://eastus.api.cognitive.microsoft.com/text/analytics/v2.0/sentiment\")\\\n",
    "    .setSubscriptionKey(TEXT_API_KEY)\\\n",
    "    .setOutputCol(\"sentiment\")\n",
    "\n",
    "#Extract the sentiment score from the API response body\n",
    "getSentiment = SQLTransformer(statement=\"SELECT *, sentiment[0].score as sentimentScore FROM __THIS__\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Tying it all together\n",
    "\n",
    "Now that we have built the stages of our pipeline its time to chain them together into a single model that can be used to process batches of incoming data\n",
    "\n",
    "<img src=\"https://mmlspark.blob.core.windows.net/graphics/Cog%20Service%20NB/full%20pipe.png\" width=\"800\" style=\"float: center;\"/>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Select the final coulmns\n",
    "cleanupColumns = SelectColumns().setCols([\"url\", \"firstCeleb\", \"text\", \"sentimentScore\"])\n",
    "\n",
    "celebrityQuoteAnalysis = PipelineModel(stages=[\n",
    "  bingSearch, getUrls, celebs, firstCeleb, recognizeText, getText, sentimentTransformer, getSentiment, cleanupColumns])\n",
    "\n",
    "celebrityQuoteAnalysis.transform(bingParameters).show(5)"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  },
  "name": "Cognitive Services on Spark"
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
